Skip to main content

Database Practical Examples

This document provides hands-on examples and code snippets for various database operations and scenarios.

SQL Examples

Basic CRUD Operations

Create Table with Constraints

-- Create a comprehensive employees table
CREATE TABLE employees (
id INT PRIMARY KEY AUTO_INCREMENT,
employee_id VARCHAR(10) UNIQUE NOT NULL,
first_name VARCHAR(50) NOT NULL,
last_name VARCHAR(50) NOT NULL,
email VARCHAR(100) UNIQUE NOT NULL,
phone VARCHAR(20),
hire_date DATE NOT NULL,
salary DECIMAL(10,2) CHECK (salary > 0),
department_id INT,
manager_id INT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,

FOREIGN KEY (department_id) REFERENCES departments(id),
FOREIGN KEY (manager_id) REFERENCES employees(id)
);

-- Create departments table
CREATE TABLE departments (
id INT PRIMARY KEY AUTO_INCREMENT,
name VARCHAR(100) NOT NULL,
location VARCHAR(100),
budget DECIMAL(12,2),
manager_id INT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

Advanced Queries

-- Complex join with aggregation
SELECT
d.name AS department_name,
COUNT(e.id) AS employee_count,
AVG(e.salary) AS average_salary,
MAX(e.salary) AS highest_salary,
MIN(e.salary) AS lowest_salary
FROM departments d
LEFT JOIN employees e ON d.id = e.department_id
WHERE d.budget > 100000
GROUP BY d.id, d.name
HAVING COUNT(e.id) > 5
ORDER BY average_salary DESC;

-- Window functions for ranking
SELECT
first_name,
last_name,
salary,
department_id,
ROW_NUMBER() OVER (PARTITION BY department_id ORDER BY salary DESC) AS dept_rank,
RANK() OVER (ORDER BY salary DESC) AS overall_rank,
DENSE_RANK() OVER (ORDER BY salary DESC) AS dense_rank,
LAG(salary, 1) OVER (ORDER BY salary DESC) AS prev_salary,
LEAD(salary, 1) OVER (ORDER BY salary DESC) AS next_salary
FROM employees
ORDER BY salary DESC;

-- Recursive CTE for hierarchical data
WITH RECURSIVE employee_hierarchy AS (
-- Base case: top-level managers
SELECT id, first_name, last_name, manager_id, 0 AS level
FROM employees
WHERE manager_id IS NULL

UNION ALL

-- Recursive case: subordinates
SELECT e.id, e.first_name, e.last_name, e.manager_id, eh.level + 1
FROM employees e
JOIN employee_hierarchy eh ON e.manager_id = eh.id
)
SELECT
CONCAT(REPEAT(' ', level), first_name, ' ', last_name) AS hierarchy,
level
FROM employee_hierarchy
ORDER BY level, first_name;

Indexing Strategies

-- Create composite index for common query patterns
CREATE INDEX idx_employee_dept_salary ON employees(department_id, salary DESC);

-- Create covering index to avoid table lookups
CREATE INDEX idx_employee_covering ON employees(department_id, salary, first_name, last_name);

-- Create partial index for specific conditions
CREATE INDEX idx_high_salary_employees ON employees(salary) WHERE salary > 100000;

-- Create functional index
CREATE INDEX idx_employee_email_domain ON employees(SUBSTRING(email, LOCATE('@', email) + 1));

-- Analyze index usage
EXPLAIN FORMAT=JSON
SELECT first_name, last_name, salary
FROM employees
WHERE department_id = 1 AND salary > 50000;

NoSQL Examples

MongoDB Operations

Document Structure and Queries

// Insert documents with embedded data
db.employees.insertMany([
{
_id: ObjectId(),
employee_id: "EMP001",
personal_info: {
first_name: "John",
last_name: "Doe",
email: "john.doe@company.com",
phone: "+1-555-0123",
},
employment: {
hire_date: ISODate("2023-01-15"),
salary: 75000,
department: "Engineering",
position: "Software Engineer",
},
skills: ["JavaScript", "Python", "React", "Node.js"],
projects: [
{
name: "E-commerce Platform",
role: "Lead Developer",
start_date: ISODate("2023-02-01"),
end_date: ISODate("2023-08-31"),
},
],
performance: {
rating: 4.5,
goals_met: 8,
goals_total: 10,
},
},
]);

// Complex aggregation pipeline
db.employees.aggregate([
// Match employees in Engineering department
{ $match: { "employment.department": "Engineering" } },

// Unwind skills array to analyze individual skills
{ $unwind: "$skills" },

// Group by skill and calculate statistics
{
$group: {
_id: "$skills",
count: { $sum: 1 },
avg_salary: { $avg: "$employment.salary" },
max_salary: { $max: "$employment.salary" },
min_salary: { $min: "$employment.salary" },
},
},

// Sort by count descending
{ $sort: { count: -1 } },

// Limit to top 10 skills
{ $limit: 10 },
]);

// Text search with scoring
db.employees
.find(
{ $text: { $search: "JavaScript React Node.js" } },
{
score: { $meta: "textScore" },
"personal_info.first_name": 1,
"personal_info.last_name": 1,
"employment.position": 1,
skills: 1,
}
)
.sort({ score: { $meta: "textScore" } });

// Geospatial queries (if location data exists)
db.employees.find({
location: {
$near: {
$geometry: {
type: "Point",
coordinates: [-74.0059, 40.7128], // New York coordinates
},
$maxDistance: 10000, // 10km radius
},
},
});

Redis Operations

Caching Patterns

import redis
import json
import time

# Connect to Redis
r = redis.Redis(host='localhost', port=6379, db=0)

# Cache-aside pattern
def get_employee(employee_id):
# Try to get from cache first
cached_data = r.get(f"employee:{employee_id}")
if cached_data:
return json.loads(cached_data)

# If not in cache, get from database
employee_data = database.get_employee(employee_id)

# Store in cache with expiration
r.setex(f"employee:{employee_id}", 3600, json.dumps(employee_data))

return employee_data

# Write-through pattern
def update_employee(employee_id, data):
# Update database
database.update_employee(employee_id, data)

# Update cache
r.setex(f"employee:{employee_id}", 3600, json.dumps(data))

# Cache invalidation
def delete_employee(employee_id):
# Delete from database
database.delete_employee(employee_id)

# Remove from cache
r.delete(f"employee:{employee_id}")

# Session storage
def create_user_session(user_id, session_data):
session_id = f"session:{user_id}:{int(time.time())}"
r.setex(session_id, 1800, json.dumps(session_data)) # 30 minutes
return session_id

# Rate limiting
def check_rate_limit(user_id, limit=100, window=3600):
key = f"rate_limit:{user_id}"
current = r.incr(key)

if current == 1:
r.expire(key, window)

return current <= limit

Database Design Patterns

Repository Pattern Implementation

Python Example

from abc import ABC, abstractmethod
from typing import List, Optional, Dict, Any
import sqlite3

class EmployeeRepository(ABC):
@abstractmethod
def find_by_id(self, employee_id: int) -> Optional[Dict[str, Any]]:
pass

@abstractmethod
def find_by_department(self, department_id: int) -> List[Dict[str, Any]]:
pass

@abstractmethod
def save(self, employee: Dict[str, Any]) -> int:
pass

@abstractmethod
def update(self, employee_id: int, employee: Dict[str, Any]) -> bool:
pass

@abstractmethod
def delete(self, employee_id: int) -> bool:
pass

class SQLiteEmployeeRepository(EmployeeRepository):
def __init__(self, db_path: str):
self.db_path = db_path

def _get_connection(self):
return sqlite3.connect(self.db_path)

def find_by_id(self, employee_id: int) -> Optional[Dict[str, Any]]:
with self._get_connection() as conn:
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute(
"SELECT * FROM employees WHERE id = ?",
(employee_id,)
)
row = cursor.fetchone()
return dict(row) if row else None

def find_by_department(self, department_id: int) -> List[Dict[str, Any]]:
with self._get_connection() as conn:
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute(
"SELECT * FROM employees WHERE department_id = ?",
(department_id,)
)
rows = cursor.fetchall()
return [dict(row) for row in rows]

def save(self, employee: Dict[str, Any]) -> int:
with self._get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO employees (first_name, last_name, email, department_id, salary)
VALUES (?, ?, ?, ?, ?)
""", (
employee['first_name'],
employee['last_name'],
employee['email'],
employee['department_id'],
employee['salary']
))
return cursor.lastrowid

def update(self, employee_id: int, employee: Dict[str, Any]) -> bool:
with self._get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
UPDATE employees
SET first_name = ?, last_name = ?, email = ?,
department_id = ?, salary = ?
WHERE id = ?
""", (
employee['first_name'],
employee['last_name'],
employee['email'],
employee['department_id'],
employee['salary'],
employee_id
))
return cursor.rowcount > 0

def delete(self, employee_id: int) -> bool:
with self._get_connection() as conn:
cursor = conn.cursor()
cursor.execute("DELETE FROM employees WHERE id = ?", (employee_id,))
return cursor.rowcount > 0

# Usage example
repo = SQLiteEmployeeRepository("company.db")
employee = repo.find_by_id(1)
employees = repo.find_by_department(1)

Unit of Work Pattern

class UnitOfWork:
def __init__(self, repository: EmployeeRepository):
self.repository = repository
self.new_objects = []
self.dirty_objects = []
self.removed_objects = []

def register_new(self, employee: Dict[str, Any]):
self.new_objects.append(employee)

def register_dirty(self, employee: Dict[str, Any]):
if employee not in self.dirty_objects:
self.dirty_objects.append(employee)

def register_removed(self, employee_id: int):
self.removed_objects.append(employee_id)

def commit(self):
try:
# Insert new objects
for employee in self.new_objects:
self.repository.save(employee)

# Update dirty objects
for employee in self.dirty_objects:
self.repository.update(employee['id'], employee)

# Delete removed objects
for employee_id in self.removed_objects:
self.repository.delete(employee_id)

# Clear tracking
self.new_objects.clear()
self.dirty_objects.clear()
self.removed_objects.clear()

except Exception as e:
# Rollback logic would go here
raise e

Database Migration Examples

SQL Migration Scripts

-- Migration: Add employee skills table
-- Version: 1.1.0
-- Date: 2024-01-15

-- Create skills table
CREATE TABLE skills (
id INT PRIMARY KEY AUTO_INCREMENT,
name VARCHAR(100) NOT NULL UNIQUE,
category VARCHAR(50),
description TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- Create employee_skills junction table
CREATE TABLE employee_skills (
employee_id INT NOT NULL,
skill_id INT NOT NULL,
proficiency_level ENUM('Beginner', 'Intermediate', 'Advanced', 'Expert') DEFAULT 'Beginner',
years_experience INT DEFAULT 0,
certified BOOLEAN DEFAULT FALSE,
certified_date DATE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,

PRIMARY KEY (employee_id, skill_id),
FOREIGN KEY (employee_id) REFERENCES employees(id) ON DELETE CASCADE,
FOREIGN KEY (skill_id) REFERENCES skills(id) ON DELETE CASCADE
);

-- Insert initial skills data
INSERT INTO skills (name, category, description) VALUES
('JavaScript', 'Programming', 'High-level programming language'),
('Python', 'Programming', 'General-purpose programming language'),
('React', 'Frontend', 'JavaScript library for building user interfaces'),
('Node.js', 'Backend', 'JavaScript runtime for server-side development'),
('SQL', 'Database', 'Structured Query Language for database management'),
('MongoDB', 'Database', 'NoSQL document database'),
('Docker', 'DevOps', 'Containerization platform'),
('AWS', 'Cloud', 'Amazon Web Services cloud platform');

-- Add indexes for performance
CREATE INDEX idx_employee_skills_employee ON employee_skills(employee_id);
CREATE INDEX idx_employee_skills_skill ON employee_skills(skill_id);
CREATE INDEX idx_employee_skills_proficiency ON employee_skills(proficiency_level);

Database Schema Versioning

class DatabaseMigrator:
def __init__(self, connection):
self.connection = connection
self.migrations = []

def add_migration(self, version, description, up_sql, down_sql):
self.migrations.append({
'version': version,
'description': description,
'up_sql': up_sql,
'down_sql': down_sql
})

def get_current_version(self):
cursor = self.connection.cursor()
cursor.execute("SELECT version FROM schema_migrations ORDER BY version DESC LIMIT 1")
result = cursor.fetchone()
return result[0] if result else 0

def migrate_up(self, target_version=None):
current_version = self.get_current_version()
migrations_to_run = [
m for m in self.migrations
if m['version'] > current_version and
(target_version is None or m['version'] <= target_version)
]

for migration in sorted(migrations_to_run, key=lambda x: x['version']):
print(f"Running migration {migration['version']}: {migration['description']}")
cursor = self.connection.cursor()
cursor.execute(migration['up_sql'])
cursor.execute(
"INSERT INTO schema_migrations (version, description) VALUES (?, ?)",
(migration['version'], migration['description'])
)
self.connection.commit()

def migrate_down(self, target_version):
current_version = self.get_current_version()
migrations_to_rollback = [
m for m in self.migrations
if m['version'] > target_version and m['version'] <= current_version
]

for migration in sorted(migrations_to_rollback, key=lambda x: x['version'], reverse=True):
print(f"Rolling back migration {migration['version']}: {migration['description']}")
cursor = self.connection.cursor()
cursor.execute(migration['down_sql'])
cursor.execute(
"DELETE FROM schema_migrations WHERE version = ?",
(migration['version'],)
)
self.connection.commit()

# Usage
migrator = DatabaseMigrator(connection)

# Add migrations
migrator.add_migration(
version=1,
description="Create employees table",
up_sql="CREATE TABLE employees (id INT PRIMARY KEY, name VARCHAR(100))",
down_sql="DROP TABLE employees"
)

migrator.add_migration(
version=2,
description="Add email column to employees",
up_sql="ALTER TABLE employees ADD COLUMN email VARCHAR(100)",
down_sql="ALTER TABLE employees DROP COLUMN email"
)

# Run migrations
migrator.migrate_up()

Performance Optimization Examples

Query Optimization

-- Before: Inefficient query
SELECT e.first_name, e.last_name, d.name as department_name
FROM employees e
JOIN departments d ON e.department_id = d.id
WHERE e.salary > 50000
ORDER BY e.salary DESC;

-- After: Optimized with proper indexing
-- Create covering index
CREATE INDEX idx_employee_salary_dept ON employees(salary DESC, department_id, first_name, last_name);

-- Optimized query
SELECT e.first_name, e.last_name, d.name as department_name
FROM employees e
JOIN departments d ON e.department_id = d.id
WHERE e.salary > 50000
ORDER BY e.salary DESC
LIMIT 100;

-- Use EXPLAIN to analyze query plan
EXPLAIN FORMAT=JSON
SELECT e.first_name, e.last_name, d.name as department_name
FROM employees e
JOIN departments d ON e.department_id = d.id
WHERE e.salary > 50000
ORDER BY e.salary DESC;

Connection Pooling

import psycopg2
from psycopg2 import pool
import threading
import time

class DatabaseConnectionPool:
def __init__(self, min_connections=5, max_connections=20, **kwargs):
self.connection_pool = psycopg2.pool.ThreadedConnectionPool(
min_connections, max_connections, **kwargs
)
self.lock = threading.Lock()

def get_connection(self):
return self.connection_pool.getconn()

def return_connection(self, connection):
self.connection_pool.putconn(connection)

def close_all(self):
self.connection_pool.closeall()

# Usage with context manager
class DatabaseContext:
def __init__(self, pool):
self.pool = pool
self.connection = None

def __enter__(self):
self.connection = self.pool.get_connection()
return self.connection

def __exit__(self, exc_type, exc_val, exc_tb):
if self.connection:
self.pool.return_connection(self.connection)

# Example usage
pool = DatabaseConnectionPool(
min_connections=5,
max_connections=20,
host='localhost',
database='company',
user='admin',
password='password'
)

def execute_query(query, params=None):
with DatabaseContext(pool) as conn:
cursor = conn.cursor()
cursor.execute(query, params)
return cursor.fetchall()

Database Security Examples

SQL Injection Prevention

import sqlite3
import hashlib
import secrets

class SecureDatabase:
def __init__(self, db_path):
self.connection = sqlite3.connect(db_path)
self.connection.row_factory = sqlite3.Row

def authenticate_user(self, username, password):
# Use parameterized queries to prevent SQL injection
cursor = self.connection.cursor()
cursor.execute(
"SELECT id, username, password_hash, salt FROM users WHERE username = ?",
(username,)
)
user = cursor.fetchone()

if user:
# Verify password with salt
password_hash = self._hash_password(password, user['salt'])
if password_hash == user['password_hash']:
return user
return None

def create_user(self, username, password):
# Generate random salt
salt = secrets.token_hex(16)
password_hash = self._hash_password(password, salt)

cursor = self.connection.cursor()
cursor.execute(
"INSERT INTO users (username, password_hash, salt) VALUES (?, ?, ?)",
(username, password_hash, salt)
)
self.connection.commit()
return cursor.lastrowid

def _hash_password(self, password, salt):
# Use PBKDF2 for password hashing
return hashlib.pbkdf2_hmac('sha256', password.encode(), salt.encode(), 100000).hex()

def search_employees_safe(self, search_term):
# Safe search with parameterized query
cursor = self.connection.cursor()
cursor.execute(
"SELECT * FROM employees WHERE first_name LIKE ? OR last_name LIKE ?",
(f"%{search_term}%", f"%{search_term}%")
)
return cursor.fetchall()

# Example of what NOT to do (vulnerable to SQL injection)
def search_employees_unsafe(connection, search_term):
cursor = connection.cursor()
# This is vulnerable to SQL injection!
query = f"SELECT * FROM employees WHERE first_name LIKE '%{search_term}%'"
cursor.execute(query)
return cursor.fetchall()

Data Encryption

from cryptography.fernet import Fernet
import base64
import os

class DatabaseEncryption:
def __init__(self):
# Generate or load encryption key
self.key = self._get_or_create_key()
self.cipher = Fernet(self.key)

def _get_or_create_key(self):
key_file = 'encryption.key'
if os.path.exists(key_file):
with open(key_file, 'rb') as f:
return f.read()
else:
key = Fernet.generate_key()
with open(key_file, 'wb') as f:
f.write(key)
return key

def encrypt_sensitive_data(self, data):
"""Encrypt sensitive data before storing"""
if isinstance(data, str):
data = data.encode()
return self.cipher.encrypt(data)

def decrypt_sensitive_data(self, encrypted_data):
"""Decrypt sensitive data when retrieving"""
decrypted = self.cipher.decrypt(encrypted_data)
return decrypted.decode()

def store_encrypted_employee(self, connection, employee_data):
"""Store employee with encrypted sensitive fields"""
cursor = connection.cursor()

# Encrypt sensitive fields
encrypted_ssn = self.encrypt_sensitive_data(employee_data['ssn'])
encrypted_salary = self.encrypt_sensitive_data(str(employee_data['salary']))

cursor.execute("""
INSERT INTO employees (first_name, last_name, email, encrypted_ssn, encrypted_salary)
VALUES (?, ?, ?, ?, ?)
""", (
employee_data['first_name'],
employee_data['last_name'],
employee_data['email'],
encrypted_ssn,
encrypted_salary
))
connection.commit()

def retrieve_encrypted_employee(self, connection, employee_id):
"""Retrieve and decrypt employee data"""
cursor = connection.cursor()
cursor.execute(
"SELECT * FROM employees WHERE id = ?",
(employee_id,)
)
row = cursor.fetchone()

if row:
# Decrypt sensitive fields
decrypted_ssn = self.decrypt_sensitive_data(row['encrypted_ssn'])
decrypted_salary = float(self.decrypt_sensitive_data(row['encrypted_salary']))

return {
'id': row['id'],
'first_name': row['first_name'],
'last_name': row['last_name'],
'email': row['email'],
'ssn': decrypted_ssn,
'salary': decrypted_salary
}
return None

These examples demonstrate practical database operations, design patterns, and security measures that are commonly used in real-world applications.